package org.example.nebula.local

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.{CcConfig, CoefficientConfig, LPAConfig, LouvainConfig, PRConfig}
import com.vesoft.nebula.algorithm.lib.{ClusteringCoefficientAlgo, ConnectedComponentsAlgo, DegreeStaticAlgo, LabelPropagationAlgo, LouvainAlgo, PageRankAlgo, StronglyConnectedComponentsAlgo, TriangleCountAlgo}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.example.nebula.basic.ReadData
import org.slf4j.LoggerFactory

object RunAlgo {

  var log = LoggerFactory.getLogger(this.getClass.getSimpleName)

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .appName("nebula-algo")
      .master("local[3]")
      .config(sparkConf)
      .getOrCreate()

    //读取文件中的图数据
    val csvDF = ReadData.readCsvData(spark)

    pageRankAlgo(spark, csvDF)

    clusteringCoefficientAlgo(spark, csvDF)

    degreeStaticAlgo(spark, csvDF)

    louvainAlgo(spark, csvDF)

    labelPropagationAlgo(spark, csvDF)

    stronglyConnectedComponentsAlgo(spark, csvDF)

    triangleCountAlgo(spark, csvDF)

    connectedComponentsAlgo(spark, csvDF)

    spark.stop()

  }

  def pageRankAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val pageRankConfig = PRConfig(3, 0.85)
    val pr             = PageRankAlgo.apply(spark, df, pageRankConfig, false)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def clusteringCoefficientAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val localClusteringCoefficientConfig = new CoefficientConfig("local")
    val pr = ClusteringCoefficientAlgo.apply(spark, df, localClusteringCoefficientConfig)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def degreeStaticAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val pr = DegreeStaticAlgo.apply(spark, df)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def louvainAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val louvainConfig  = LouvainConfig(10, 5, 0.5)
    val pr = LouvainAlgo.apply(spark, df, louvainConfig, false)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def labelPropagationAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val lpaConfig = LPAConfig(10)
    val pr = LabelPropagationAlgo.apply(spark, df, lpaConfig, false)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def stronglyConnectedComponentsAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val ccConfig = CcConfig(Int.MaxValue)
    val pr  = StronglyConnectedComponentsAlgo.apply(spark, df, ccConfig, false)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def triangleCountAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val pr  = TriangleCountAlgo.apply(spark, df)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

  def connectedComponentsAlgo(spark: SparkSession, df: DataFrame): Unit = {
    val star = System.nanoTime()
    val ccConfig = CcConfig(20)
    val pr  = ConnectedComponentsAlgo.apply(spark, df, ccConfig, false)
    val end = System.nanoTime()
    log.warn("消耗时间" + (end - star)/1000000 + "ms")

    pr.show()
  }

}
